// Copyright (c) 2016 Chef Software Inc. and/or applicable contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fs::{self, File};
use std::io::BufReader;
use std::io::prelude::*;
use std::net::SocketAddrV4;
use std::path::PathBuf;
use std::process::{Command, Stdio, Child};
use std::str::FromStr;
use std::thread;

use libc::{pid_t, c_int};
use time::{Duration, SteadyTime};

use error::Result;
use hcore;
use hcore::package::PackageIdent;
use hsup::package::Package;
use hsup::supervisor::{WEXITSTATUS, WIFEXITED, WIFSIGNALED, WTERMSIG, Pid};
use hsup::util::signals;
use super::ServiceDef;
use super::error::Error;

const HAB_SUP_PATH: &'static str = "/src/components/sup/target/debug/hab-sup";
static LOGKEY: &'static str = "TASK";

// Functions from POSIX libc.
extern "C" {
    fn waitpid(pid: pid_t, status: *mut c_int, options: c_int) -> pid_t;
}


/// Where and with what command a Task runs
/// This is most useful for testing.
#[derive(Debug, Clone)]
pub struct ExecContext {
    pub sup_path: PathBuf,
    pub service_root: PathBuf,
}

impl ExecContext {
    pub fn new(sup_path: PathBuf, service_root: PathBuf) -> ExecContext {
        ExecContext {
            sup_path: sup_path,
            service_root: service_root,
        }
    }
}


impl Default for ExecContext {
    /// use the latest hab-sup and the default Hab service root
    /// Fall back to HAB_SUP_PATH if we can't find it.
    /// Run hab-director with `RUST_LOG=habitat_director=debug`
    /// to see the start params including hab-sup path
    fn default() -> ExecContext {
        if let Ok(ident) = PackageIdent::from_str("core/hab-sup") {
            if let Ok(pkg) = Package::load(&ident, None) {
                let mut hab_director = pkg.path().to_path_buf();
                hab_director.push("bin");
                hab_director.push("hab-sup");
                ExecContext::new(hab_director, hcore::fs::svc_root())
            } else {
                println!("Can't find core/hab-sup, falling back to {}", &HAB_SUP_PATH);
                ExecContext::new(PathBuf::from(HAB_SUP_PATH), hcore::fs::svc_root())
            }
        } else {
            println!("Can't find core/hab-sup, falling back to {}", &HAB_SUP_PATH);
            ExecContext::new(PathBuf::from(HAB_SUP_PATH), hcore::fs::svc_root())
        }
    }
}

/// Values for a Task that are generated by the Controller.
/// These values will be unique per Task.
#[derive(Debug, Clone)]
pub struct ExecParams {
    pub gossip_listen: SocketAddrV4,
    pub sidecar_listen: SocketAddrV4,
    pub initial_peer: Option<SocketAddrV4>,
}


impl ExecParams {
    pub fn new(gossip_listen: SocketAddrV4,
               sidecar_listen: SocketAddrV4,
               initial_peer: Option<SocketAddrV4>)
               -> ExecParams {
        ExecParams {
            gossip_listen: gossip_listen,
            sidecar_listen: sidecar_listen,
            initial_peer: initial_peer,
        }
    }
}


/// A Task watches a child hab-sup process (a "supervisor supervisor").
/// It knows how to generate CLI args for `hab-sup start` based on
/// it's ServiceDef, ExecContext, and ExecParams.
#[derive(Debug)]
pub struct Task {
    pub pid: Option<Pid>,
    pub service_def: ServiceDef,
    pub exec_ctx: ExecContext,
    pub exec_params: ExecParams,
    pub state_entered: SteadyTime,
    pub starts: u64,
}

impl Task {
    pub fn new(exec_ctx: ExecContext, exec_params: ExecParams, service_def: ServiceDef) -> Task {
        Task {
            pid: None,
            service_def: service_def,
            exec_ctx: exec_ctx,
            exec_params: exec_params,
            state_entered: SteadyTime::now(),
            starts: 0,
        }
    }

    pub fn status(&self) -> String {
        let desc = if self.is_up() { "Started" } else { "Stopped" };
        format!("{}: {} for {}",
                self.service_def.to_string(),
                desc,
                SteadyTime::now() - self.state_entered)
    }

    pub fn get_cmd_args(&self) -> Result<Vec<String>> {
        let mut args = match self.service_def.cli_args.as_ref() {
            Some(s) => s.as_str().split(" ").map(|s| s.to_string()).collect::<Vec<String>>(),
            None => Vec::new(),
        };

        args.insert(0, "start".to_string());
        args.insert(1, self.service_def.ident.to_string());
        args.push("--listen-peer".to_string());
        args.push(self.exec_params.gossip_listen.to_string());
        args.push("--listen-http".to_string());
        args.push(self.exec_params.sidecar_listen.to_string());
        args.push("--group".to_string());
        args.push(self.service_def.service_group.group.clone());
        if let Some(org) = self.service_def.service_group.organization.as_ref() {
            args.push("--org".to_string());
            args.push(org.clone());
        }
        if let Some(initial_peer) = self.exec_params.initial_peer {
            args.push("--peer".to_string());
            args.push(initial_peer.to_string());
        }

        // remove whitespace, it messes up the Command args
        args.retain(|s| s != "");
        for arg in &args {
            debug!("ARG [{}]", &arg);
        }
        Ok(args)
    }

    pub fn start(&mut self) -> Result<()> {
        if self.pid.is_none() {
            let args = try!(self.get_cmd_args());

            debug!("hab-up exe = [{}]",
                   &self.exec_ctx.sup_path.to_string_lossy());

            outputln!("Starting {} [gossip {}, http API: {}, peer: {}]",
                      &self.service_def.to_string(),
                      &self.exec_params.gossip_listen,
                      &self.exec_params.sidecar_listen,
                      &self.exec_params.initial_peer
                        .as_ref()
                        .map_or("None".to_string(), |v| v.to_string()),
                      );

            let mut cmd = Command::new(&self.exec_ctx.sup_path);
            // rebind to increase lifetime and make the compiler happy
            let mut cmd = cmd.args(&args)
                .stdin(Stdio::null())
                .stdout(Stdio::piped())
                .stderr(Stdio::piped());

            for (k, v) in &self.service_def.env {
                cmd.env(&k, &v);
                debug!("ENV {}={}", &k, &v);
            }

            let mut child = try!(cmd.spawn());
            self.pid = Some(child.id());

            outputln!("Started {} [gossip {}, http API: {}, peer: {}, pid: {}]",
                      &self.service_def.to_string(),
                      &self.exec_params.gossip_listen,
                      &self.exec_params.sidecar_listen,
                      &self.exec_params
                          .initial_peer
                          .as_ref()
                          .map_or("None".to_string(), |v| v.to_string()),
                      &child.id());

            try!(self.transition_to_started());
            let name = self.service_def.to_string();
            try!(thread::Builder::new()
                .name(String::from(name.clone()))
                .spawn(move || -> Result<()> { child_reader(&mut child, name) }));
            debug!("Spawned child reader");

        } else {
            outputln!("{} already started", &self.service_def.to_string());
        }
        Ok(())
    }

    pub fn is_up(&self) -> bool {
        self.pid.is_some()
    }

    pub fn is_down(&self) -> bool {
        self.pid.is_none()
    }

    // if the child process exists, check it's status via waitpid().
    // Returns true if the process is still running, false if it has died.
    pub fn check_process(&mut self) -> Result<()> {
        if self.pid.is_none() {
            return Ok(());
        }

        unsafe {
            let mut status: c_int = 0;
            let cpid = self.pid.unwrap() as pid_t;
            match waitpid(cpid, &mut status, 1 as c_int) {
                0 => {} // Nothing returned,
                pid if pid == cpid => {
                    if WIFEXITED(status) {
                        let exit_code = WEXITSTATUS(status);
                        outputln!("{} - process {} died with exit code {}",
                                  self.service_def.ident.name,
                                  pid,
                                  exit_code);
                    } else if WIFSIGNALED(status) {
                        let exit_signal = WTERMSIG(status);
                        outputln!("{} - process {} died with signal {}",
                                  self.service_def.ident.name,
                                  pid,
                                  exit_signal);
                    } else {
                        outputln!("{} - process {} died, but I don't know how.",
                                  self.service_def.ident.name,
                                  pid);
                    }
                    try!(self.transition_to_stopped());
                    outputln!("{} - Service exited", self.service_def.ident.name);
                }
                // ZOMBIES! Bad zombies! We listen for zombies. ZOMBOCOM!
                pid => {
                    if WIFEXITED(status) {
                        let exit_code = WEXITSTATUS(status);
                        debug!("Process {} died with exit code {}", pid, exit_code);
                    } else if WIFSIGNALED(status) {
                        let exit_signal = WTERMSIG(status);
                        debug!("Process {} terminated with signal {}", pid, exit_signal);
                    } else {
                        debug!("Process {} died, but I don't know how.", pid);
                    }
                }
            }
        }
        Ok(())
    }

    fn transition_to_stopped(&mut self) -> Result<()> {
        self.pid = None;
        self.cleanup_pidfile();
        self.state_entered = SteadyTime::now();
        Ok(())
    }

    fn transition_to_started(&mut self) -> Result<()> {
        try!(self.create_pidfile());
        self.starts += 1;
        self.state_entered = SteadyTime::now();
        Ok(())
    }

    pub fn service_dir(&self) -> PathBuf {
        PathBuf::from(&self.exec_ctx.service_root).join("hab-director")
    }


    pub fn pid_file(&self) -> PathBuf {
        let sd = &self.service_def.to_string().replace(".", "-");
        let filename = format!("{}.pid", sd);
        self.service_dir().join("data").join(filename)
    }

    // Create a pid file for a package
    // The existence of this file does not guarantee that a
    // process exists at the PID contained within.
    pub fn create_pidfile(&self) -> Result<()> {
        match self.pid {
            Some(ref pid) => {
                let pid_file = self.pid_file();
                debug!("Creating PID file for child {} -> {:?}",
                       pid_file.display(),
                       pid);
                if let Some(parent) = pid_file.parent() {
                    if let Err(e) = fs::create_dir_all(parent) {
                        // in most cases, the directory already exists
                        debug!("Couldn't make pid directory: {}", e);
                    }
                }
                let mut f = try!(File::create(pid_file));
                try!(write!(f, "{}", pid));
                Ok(())
            }
            None => Ok(()),
        }
    }

    // Remove a pidfile for this package if it exists.
    // Do NOT fail if there is an error removing the PIDFILE
    pub fn cleanup_pidfile(&self) {
        let pid_file = self.pid_file();
        debug!("Attempting to clean up pid file {}", &pid_file.display());
        match fs::remove_file(pid_file) {
            Ok(_) => {
                debug!("Removed pid file");
            }
            Err(e) => {
                debug!("Error removing pidfile: {}, continuing", e);
            }
        };
    }

    // attempt to read the pidfile for this package.
    // If the pidfile does not exist, then return None,
    // otherwise, return Some(pid, uptime_seconds).
    pub fn read_pidfile(&self) -> Result<Option<Pid>> {
        let pid_file = self.pid_file();
        debug!("Reading pidfile {}", &pid_file.display());
        let mut f = try!(File::open(pid_file));
        let mut contents = String::new();
        try!(f.read_to_string(&mut contents));
        debug!("pidfile contents = {}", contents);
        let pid = match contents.parse::<u32>() {
            Ok(pid) => pid,
            Err(e) => {
                debug!("Error reading pidfile: {}", e);
                return Err(Error::DirectorError("Invalid pid file".to_string()));
            }
        };
        Ok(Some(pid))
    }



    /// Send a SIGTERM to a process, wait 8 seconds, then send SIGKILL
    /// shamelessly copied and tweaked from supervisor.rs
    pub fn stop(&mut self) -> Result<()> {
        let wait = match self.pid {
            Some(ref pid) => {
                outputln!("Stopping {}", self.service_def.to_string());
                let _ = signals::send_signal_to_pid(*pid, signals::Signal::SIGTERM);
                true
            }
            None => {
                outputln!("{} already stopped", self.service_def.to_string());
                false
            }
        };
        if wait {
            let stop_time = SteadyTime::now() + Duration::seconds(8);
            loop {
                try!(self.check_process());
                if SteadyTime::now() > stop_time {
                    outputln!("{} process failed to stop with SIGTERM; sending SIGKILL",
                              self.service_def.to_string());
                    if let Some(pid) = self.pid {
                        let _ = signals::send_signal_to_pid(pid, signals::Signal::SIGKILL);
                    }
                    break;
                }
                if self.pid.is_none() {
                    break;
                } else {
                    continue;
                }
            }
        }
        Ok(())
    }
}

impl Drop for Task {
    fn drop(&mut self) {
        outputln!("Killing task {}", &self.service_def.to_string());
        let _ = self.stop();
        let _ = self.cleanup_pidfile();
    }
}

// Consume output from a child process until EOF, then finish
fn child_reader(child: &mut Child, child_name: String) -> Result<()> {
    debug!("Started reader for {}", &child_name);
    let c_stdout = match child.stdout {
        Some(ref mut s) => s,
        None => return Err(Error::DirectorError(format!("Can't read {} stdout", &child_name))),
    };

    let mut reader = BufReader::new(c_stdout);
    let mut buffer = String::new();

    while reader.read_line(&mut buffer).unwrap() > 0 {
        let mut line = output_format!(preamble &child_name, logkey "O");
        line.push_str(&buffer);
        print!("{}", line);
        buffer.clear();
    }
    debug!("child_reader exiting");
    Ok(())
}


#[cfg(test)]
mod tests {
    use std::net::SocketAddrV4;
    use std::path::PathBuf;
    use std::str::FromStr;

    // it's in lib.rs
    use super::super::ServiceDef;

    use super::*;

    fn get_test_dc() -> Task {
        let mut sd = ServiceDef::from_str("core.redis.somegroup.someorg").unwrap();
        sd.cli_args = Some("-v -foo=bar".to_string());
        let exec_ctx = ExecContext::default();
        let exec_params = ExecParams::new(SocketAddrV4::from_str("127.0.0.1:9000").unwrap(),
                                          SocketAddrV4::from_str("127.0.0.1:8000").unwrap(),
                                          None);
        Task::new(exec_ctx, exec_params, sd)
    }

    /// parse args, inject listen-peer and listen-http, no peer
    #[test]
    fn cmd_args_parsing_no_peer() {
        let dc = get_test_dc();
        let args = dc.get_cmd_args().unwrap();
        // core/redis is specified in get_test_dc()
        assert!(args.as_slice() ==
                ["start",
                 "core/redis",
                 "-v",
                 "-foo=bar",
                 "--listen-peer",
                 "127.0.0.1:9000",
                 "--listen-http",
                 "127.0.0.1:8000",
                 "--group",
                 "somegroup",
                 "--org",
                 "someorg"]);
    }

    /// parse args, inject listen-peer, listen-http, peer
    #[test]
    fn cmd_args_parsing_peer() {
        let mut dc = get_test_dc();
        let peer = SocketAddrV4::from_str("127.0.0.1:9876").unwrap();
        // override the test default peer
        dc.exec_params.initial_peer = Some(peer);
        let args = dc.get_cmd_args().unwrap();
        // core / redis is specified in get_test_dc()
        assert!(args.as_slice() ==
                ["start",
                 "core/redis",
                 "-v",
                 "-foo=bar",
                 "--listen-peer",
                 "127.0.0.1:9000",
                 "--listen-http",
                 "127.0.0.1:8000",
                 "--group",
                 "somegroup",
                 "--org",
                 "someorg",
                 "--peer",
                 "127.0.0.1:9876"]);
    }


    /// test the pid filename using the default service directory
    #[test]
    fn pid_file_name_from_default_exec_ctx() {
        let dc = get_test_dc();
        let path = dc.pid_file().clone();
        let path = path.to_str().unwrap();
        assert!("/hab/svc/hab-director/data/core-redis-somegroup-someorg.pid" == path);
    }

    // test the pid filename using a custom service directory
    #[test]
    fn pid_file_name_from_custom_exec_ctx() {
        let mut dc = get_test_dc();
        dc.exec_ctx.service_root = PathBuf::from("/tmp");
        let path = dc.pid_file().clone();
        let path = path.to_str().unwrap();
        println!("[{}]", path);
        assert!("/tmp/hab-director/data/core-redis-somegroup-someorg.pid" == path);
    }
}
